package day03.window;

import beans.SensorReading;
import custom.MyCountAggregateFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.*;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Flink 流处理 API - TimeWindow
 * <p>
 * 说明：窗口分配器 -> window()
 * <p>
 * 。window() 方法必须在 keyBy 之后才能用
 *
 * @author lvbingbing
 * @date 2022-01-01 13:05
 */
public class FlinkWindow01 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、学习处理时间窗口
        studyProcessingTimeWindow(flinkWindow.getSingleOutputStreamOperator());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 学习处理时间窗口
     * <p>
     * 1、滚动窗口(Tumbling Window)
     * 2、滑动窗口(Sliding Window)
     * 3、会话窗口(Session Window)
     * <p>
     * 默认时间特性为：处理时间
     * <p>
     * 事件时间窗口见：studyEventTimeWindow
     *
     * @param sensorReadingStream <br>
     * @see StreamExecutionEnvironment#DEFAULT_TIME_CHARACTERISTIC
     * @see FlinkWindow05#studyEventTimeWindow(org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator)
     */
    private static void studyProcessingTimeWindow(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        // 1、Tumbling Window
        studyTumblingWindow(sensorReadingStream);
        // 2、Sliding Window
        studySlidingWindow(sensorReadingStream);
        // 3、Session Window
        studySessionWindow(sensorReadingStream);
    }

    /**
     * 滚动窗口
     * <p>
     * 说明：滚动窗口分为：① 事件时间滚动窗口；② 处理时间滚动窗口
     * <p>
     * 默认为处理时间滚动窗口。
     *
     * @param sensorReadingStream <br>
     * @see TimeCharacteristic
     */
    private static void studyTumblingWindow(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        // 方式一：使用 timeWindow() 设置滚动窗口，默认为处理时间滚动窗口
        DataStream<Integer> resStream = sensorReadingStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .aggregate(new MyCountAggregateFunction());
        resStream.print("使用timeWindow()设置滚动窗口");
        // 方式二：使用 window() 设置滚动窗口(处理时间滚动窗口)
        SingleOutputStreamOperator<Integer> tumblingProcessTimeWindowDs = sensorReadingStream.keyBy("id")
                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
                .aggregate(new MyCountAggregateFunction());
        tumblingProcessTimeWindowDs.print("处理时间滚动窗口");
    }

    /**
     * 滑动窗口
     * <p>
     * 说明：滑动窗口分为：① 事件时间滑动窗口；② 处理时间滑动窗口
     * <p>
     * 默认为处理时间滑动窗口。
     *
     * @param sensorReadingStream <br>
     */
    private static void studySlidingWindow(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        // 方式一：使用 timeWindow() 设置滑动窗口，默认为处理时间滑动窗口
        SingleOutputStreamOperator<Integer> resStream = sensorReadingStream.keyBy("id")
                .timeWindow(Time.seconds(15), Time.seconds(5))
                .aggregate(new MyCountAggregateFunction());
        resStream.print("使用 timeWindow() 设置滑动窗口");
        // 方式二：使用 window() 设置滑动窗口(处理时间滑动窗口)
        SingleOutputStreamOperator<Integer> slidingProcessTimeWindowDs = sensorReadingStream.keyBy("id")
                .window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(15)))
                .aggregate(new MyCountAggregateFunction());
        slidingProcessTimeWindowDs.print("处理时间滑动窗口");
    }

    /**
     * 会话窗口
     * <p>
     * 说明：会话窗口分为：① 事件时间会话窗口；② 处理时间会话窗口
     *
     * @param sensorReadingStream <br>
     */
    private static void studySessionWindow(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        // 使用 window() 设置会话窗口(处理时间会话窗口)
        SingleOutputStreamOperator<Integer> processTimeSessionWindowsDs = sensorReadingStream.keyBy("id")
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
                .aggregate(new MyCountAggregateFunction());
        processTimeSessionWindowsDs.print("处理时间会话窗口");
    }
}
